package com.lagou.rpc.consumer.server;

import com.alibaba.fastjson.JSONObject;
import com.lagou.rpc.consumer.client.RpcClient;
import com.lagou.rpc.consumer.proxy.RpcClientProxy;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

/**
 * @author bwcx_jzy
 * @since 2022/2/17
 */
@Service
public class ZookeeperServer {

    private static final String HOST = "172.19.107.2:2181";
    private CuratorFramework client;

    private static final String PARENT_PATH = "/lg-curator";

    public void initListener() {
        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
        // 使用fluent编程风格
        client = CuratorFrameworkFactory.builder()
                .connectString(HOST)
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(exponentialBackoffRetry)
                // 独立的命名空间 /base
                .namespace("base")
                .build();

        client.start();

        // 节点


        CuratorCache build = CuratorCache.build(client, PARENT_PATH);
        build.listenable().addListener((type, oldData, data) -> {
            if (type == CuratorCacheListener.Type.NODE_CREATED) {
                String dataPath = data.getPath();
                if (dataPath.endsWith("responseTime")) {
                    return;
                }
                dataPath = dataPath.substring(1);
                String[] split = dataPath.split("/");
                if (split.length != 2) {
                    return;
                }

                byte[] bytes = data.getData();
                String json = new String(bytes, StandardCharsets.UTF_8);
                JSONObject jsonObject = JSONObject.parseObject(json);
                try {
                    RpcClientProxy.addClient(jsonObject.getString("ip"), jsonObject.getIntValue("port"));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } else if (type == CuratorCacheListener.Type.NODE_DELETED) {
                String dataPath = oldData.getPath();
                if (dataPath.endsWith("responseTime")) {
                    return;
                }
                dataPath = dataPath.substring(1);
                String[] split = dataPath.split("/");
                if (split.length != 2) {
                    return;
                }
                byte[] bytes = oldData.getData();
                String json = new String(bytes, StandardCharsets.UTF_8);
                JSONObject jsonObject = JSONObject.parseObject(json);
                RpcClientProxy.removeClient(jsonObject.getString("ip"), jsonObject.getIntValue("port"));
            } else if (type == CuratorCacheListener.Type.NODE_CHANGED) {
                //
                System.out.println(data.getPath());
            }
        });
        build.start();
    }

    public void initReport() {
        TimerTask task = new TimerTask() {
            @Override
            public void run() {
                List<RpcClient> rpcClients = RpcClientProxy.listRpcClient();
                for (RpcClient rpcClient : rpcClients) {
                    updateResponseTime(rpcClient);
                }
            }
        };
        Timer timer = new Timer();
        long delay = 0;
        long intevalPeriod = 5 * 1000;
        // schedules the task to be run in an interval
        timer.scheduleAtFixedRate(task, delay, intevalPeriod);
    }

    public void updateResponseTime(RpcClient rpcClient) {
        try {
            long responseTime = rpcClient.getResponseTime();
            if (responseTime == 0) {
                // 没有请求不参与
                return;
            }
            long timeMillis = System.currentTimeMillis();
            String path = rpcClient.getIp() + "-" + rpcClient.getPort();
            String nodePath = PARENT_PATH + "/" + path;
            String allPath = nodePath + "-responseTime";
            Stat forPath = client.checkExists().forPath(allPath);
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("responseTime", responseTime);
            jsonObject.put("timeMillis", timeMillis);
            jsonObject.put("lastReqTime", rpcClient.getLastReqTime());
            if (forPath == null) {
                // 不存在节点
                client.create()
                        .withMode(CreateMode.EPHEMERAL).forPath(allPath, jsonObject.toString().getBytes(StandardCharsets.UTF_8));
                return;
            }
            byte[] bytes = client.getData().forPath(allPath);
            JSONObject oldData = JSONObject.parseObject(new String(bytes, StandardCharsets.UTF_8));
            long lastReqTime = rpcClient.getLastReqTime();
            //oldData.getLongValue("lastReqTime");
            if (timeMillis - lastReqTime > 5 * 1000) {
                // 节点失效
                client.delete().forPath(allPath);
                client.delete().forPath(nodePath);
                return;
            }
            // 更新数据
            client.setData().forPath(allPath, jsonObject.toString().getBytes(StandardCharsets.UTF_8));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
